//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// insert_executor.cpp
//
// Identification: src/execution/insert_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include <algorithm>
#include <memory>

#include "execution/executors/insert_executor.h"

namespace bustub {

InsertExecutor::InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                               std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      child_executor_(std::move(child_executor)),
      table_info_(exec_ctx_->GetCatalog()->GetTable(plan_->TableOid())) {}

void InsertExecutor::Init() {
  child_executor_->Init();
  rows_ = 0;
  return_trued_ = false;
  LOG_DEBUG("Init");
  auto oid = plan_->TableOid();
  auto txn = exec_ctx_->GetTransaction();
  auto lock_mgr = exec_ctx_->GetLockManager();
  lock_mgr->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, oid);
}

void InsertExecutor::NextResult(Tuple *tuple, RID *rid) {
  Value value = {INTEGER, rows_};
  Column col = {"foo", INTEGER};
  auto dummy_schema = Schema{{col}};
  *tuple = Tuple{{value}, &GetOutputSchema()};
}

auto InsertExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool {
  // LOG_DEBUG("Next0");
  while (child_executor_->Next(tuple, rid)) {
    // 插入
    auto tuple_meta = TupleMeta();
    tuple_meta.is_deleted_ = false;
    auto transaction = exec_ctx_->GetTransaction();
    auto rid_or_none = table_info_->table_->InsertTuple(tuple_meta, *tuple, exec_ctx_->GetLockManager(), transaction,
                                                        table_info_->oid_);
    // LOG_DEBUG("Next2");
    if (!rid_or_none.has_value()) {
      NextResult(tuple, rid);
      return false;
    }

    auto oid = plan_->TableOid();
    auto txn = exec_ctx_->GetTransaction();
    auto lock_mgr = exec_ctx_->GetLockManager();
    try {
      bool res = lock_mgr->LockRow(txn, LockManager::LockMode::EXCLUSIVE, oid, rid_or_none.value());
      if (!res) {
        throw ExecutionException("insert");
      }
    } catch (TransactionAbortException) {
      throw ExecutionException("insert");
    }

    // 更新写集
    TableWriteRecord w_record{oid, rid_or_none.value(), table_info_->table_.get()};
    w_record.wtype_ = WType::INSERT;
    txn->AppendTableWriteRecord(w_record);

    // 插入索引
    auto indexs = exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_);
    bool insert_result = true;
    for (auto &&index_info : indexs) {
      // LOG_DEBUG("index=%s", index_info->name_.c_str());
      insert_result = index_info->index_->InsertEntry(
          tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index_info->key_schema_,
                              index_info->index_->GetKeyAttrs()),
          rid_or_none.value(), transaction);
      if (!insert_result) {
        NextResult(tuple, rid);
        // LOG_DEBUG("Next4");
        try {
          bool res = lock_mgr->UnlockRow(txn, oid, rid_or_none.value());
          if (!res) {
            throw ExecutionException("insert");
          }
        } catch (TransactionAbortException) {
          throw ExecutionException("insert");
        }
        return false;
      }
      // 更新写集
      IndexWriteRecord idx_w_record{rid_or_none.value(),    oid, WType::INSERT, *tuple, index_info->index_oid_,
                                    exec_ctx_->GetCatalog()};
      txn->AppendIndexWriteRecord(idx_w_record);
    }
    rows_++;
  }
  NextResult(tuple, rid);
  // LOG_DEBUG("Next5");
  bool ret = false;
  ret = !return_trued_;
  return_trued_ = true;
  return ret;
}

}  // namespace bustub
